/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.api.service.impl;

import com.chinamobile.cmss.lakehouse.api.dto.DataSourceBean;
import com.chinamobile.cmss.lakehouse.api.dto.DataSourceKey;
import com.chinamobile.cmss.lakehouse.api.service.LinkService;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.ConnectionConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.ConnectionWriterConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.DBParam;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.DataSourceConf;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.DataSourceWriterConf;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.FieldConf;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.ReaderConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.WriterConfig;
import com.chinamobile.cmss.lakehouse.common.enums.DBTableType;
import com.chinamobile.cmss.lakehouse.common.enums.DataSourceTypeEnum;
import com.chinamobile.cmss.lakehouse.common.exception.BaseException;
import com.chinamobile.cmss.lakehouse.dao.entity.DataSourceEntity;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Types;
import java.text.MessageFormat;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.rowset.SqlRowSet;
import org.springframework.jdbc.support.rowset.SqlRowSetMetaData;
import org.springframework.stereotype.Service;

@Slf4j
@Service("linkdDBService")
public class LinkDBServiceImpl implements LinkService {
    private static final String MYSQL_TEMPLATE_URL = "jdbc:mysql://{0}:{1}";
    private static final String DEFAULT_SHOW_TABLES = "SHOW TABLES";
    private static final String DEFAULT_CHECK_TABLE = "SELECT * FROM ";
    private static final String DEFAULT_METASQL = "select * from %s";
    private static final Set<String> MYSQL_DEFAULT_DB =
            Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys");

    private LoadingCache<DataSourceKey, HikariDataSource> dataSourceCache;

    @PostConstruct
    public void init() {
        dataSourceCache = CacheBuilder.newBuilder()
                .concurrencyLevel(100)
                .initialCapacity(50)
                .expireAfterAccess(3, TimeUnit.MINUTES)
                .maximumSize(100)
                .removalListener((RemovalListener<DataSourceKey, HikariDataSource>) notification -> {
                    HikariDataSource dataSource = notification.getValue();
                    if (dataSource != null) {
                        try {
                            dataSource.close();
                        } catch (Exception e) {
                            log.error("Datasource [{}] close err.", notification.getKey(), e);
                        }
                    }
                })
                .build(new CacheLoader<DataSourceKey, HikariDataSource>() {
                    @Override
                    public HikariDataSource load(DataSourceKey dataSourceKey) {
                        HikariDataSource dataSource = new HikariDataSource();
                        dataSource.setDriverClassName(dataSourceKey.getDataSourceType().getDriverName());
                        dataSource.setUsername(dataSourceKey.getUsername());
                        dataSource.setPassword(dataSourceKey.getPassword());
                        dataSource.setJdbcUrl(dataSourceKey.getUrl());
                        dataSource.setMinimumIdle(5);
                        dataSource.setMaximumPoolSize(15);
                        dataSource.setAutoCommit(true);
                        dataSource.setIdleTimeout(60000);
                        dataSource.setMaxLifetime(1800000);
                        dataSource.setConnectionTimeout(30000);

                        return dataSource;
                    }
                });
    }

    @Override
    public String appendUrl(DataSourceBean dataSourceBean) {
        String ip = dataSourceBean.getIp();
        String port = String.valueOf(dataSourceBean.getPort());
        if (StringUtils.isAnyBlank(ip, port)) {
            throw new BaseException("Ip and port can't be null.");
        }

        return MessageFormat.format(MYSQL_TEMPLATE_URL, ip, port);
    }

    @Override
    public Boolean isConnected(DataSourceEntity dataSourceEntity) {
        String username = dataSourceEntity.getUsername();
        String password = dataSourceEntity.getPassword();
        String url = dataSourceEntity.getUrl();

        if (StringUtils.isAnyBlank(username, password, url)) {
            return Boolean.FALSE;
        }

        String driverName = dataSourceEntity.getDataSourceType().getDriverName();
        try {
            Class.forName(driverName);
        } catch (ClassNotFoundException e) {
            return Boolean.FALSE;
        }

        try (Connection connection = DriverManager.getConnection(url, username, password)) {
            if (connection == null) {
                return Boolean.FALSE;
            }
        } catch (SQLException e) {
            return Boolean.FALSE;
        }

        return Boolean.TRUE;
    }

    @Override
    public List<String> queryTablePath(DataSourceEntity dataSourceEntity, String param) {
        List<String> syncTables = Lists.newArrayList();
        DBParam dbParam = JSONObject.parseObject(param, DBParam.class);

        if (dbParam.getDbTableType() == DBTableType.SELECTED_TABLES) {
            syncTables.addAll(dbParam.getTables());
            return syncTables;
        }

        String showTableSql = DEFAULT_SHOW_TABLES;
        Optional<JdbcTemplate> jdbcTemplate = getJdbcTemplate(dataSourceEntity);
        jdbcTemplate.get().execute("USE " + dbParam.getDataBase());
        syncTables = jdbcTemplate.get().query(showTableSql, (rs, rowNum) -> rs.getString(1));
        syncTables.removeAll(MYSQL_DEFAULT_DB);

        return syncTables;
    }

    @Override
    public ReaderConfig getReaderConfig(DataSourceEntity dataSourceEntity) {
        ReaderConfig readerConfig = new ReaderConfig();
        readerConfig.setParameter(buildDBConfig(dataSourceEntity));
        readerConfig.setName(getReaderName(dataSourceEntity.getDataSourceType()));

        return readerConfig;
    }

    @Override
    public WriterConfig getWriterConfig(DataSourceEntity dataSourceEntity) {
        WriterConfig writerConfig = new WriterConfig();
        writerConfig.setParameter(buildDBWriterConfig(dataSourceEntity));
        writerConfig.setName(getWriterName(dataSourceEntity.getDataSourceType()));

        return writerConfig;
    }

    @Override
    public String getTablePathSchema(DataSourceTypeEnum dataSourceType, String param) {
        DBParam dbParam = JSONObject.parseObject(param, DBParam.class);
        return dbParam.getDataBase();
    }

    @Override
    public List<FieldConf> getMetadata(DataSourceEntity dataSourceEntity, String table, String param) {
        List<FieldConf> tableColumns = org.apache.commons.compress.utils.Lists.newArrayList();
        Optional<JdbcTemplate> jdbcTemplate = getJdbcTemplate(dataSourceEntity);

        SqlRowSet rowSet = jdbcTemplate.get().queryForRowSet(DEFAULT_METASQL);
        SqlRowSetMetaData metaData = rowSet.getMetaData();
        int columnCount = metaData.getColumnCount();
        for (int i = 1; i <= columnCount; i++) {
            FieldConf metaColumn = new FieldConf();
            String columnName = metaData.getColumnName(i).toLowerCase();
            int columnType = metaData.getColumnType(i);
            metaColumn.setName(columnName);
            metaColumn.setIndex(i);
            metaColumn.setType(toHiveType("", columnType));

            tableColumns.add(metaColumn);
        }

        return tableColumns;
    }

    @Override
    public void updateReaderConfig(ReaderConfig readerConfig, String tablePath, List<FieldConf> columns, String sourceSchema) {
        DataSourceConf parameterConfig = (DataSourceConf) readerConfig.getParameter();
        ConnectionConfig connectionConfig = parameterConfig.getConnection().get(0);
        connectionConfig.setTable(Lists.newArrayList(tablePath));
        connectionConfig.setSchema(sourceSchema);
        parameterConfig.setColumn(columns);
    }

    @Override
    public Boolean checkTablePathExist(DataSourceEntity dataSourceEntity, String schema, String tablePath, String userName) {
        Optional<JdbcTemplate> jdbcTemplate = getJdbcTemplate(dataSourceEntity);
        try {
            SqlRowSet rowSet = jdbcTemplate.get().queryForRowSet(DEFAULT_CHECK_TABLE + tablePath);
            return rowSet.getRow() > 0;
        } catch (Exception e) {
            log.warn("Table [{}] not exist. [{}]", tablePath, e.getMessage());
            return Boolean.FALSE;
        }
    }

    @Override
    public void updateWriteConfig(WriterConfig writerConfig, String tablePath, List<FieldConf> columns, String schema) {
        DataSourceWriterConf dataSourceConf = (DataSourceWriterConf) writerConfig.getParameter();
        ConnectionWriterConfig connectionConfig = dataSourceConf.getConnection().get(0);
        connectionConfig.setTable(Lists.newArrayList(tablePath));
        connectionConfig.setSchema(schema);
        dataSourceConf.setColumn(columns);
    }

    public Optional<JdbcTemplate> getJdbcTemplate(DataSourceEntity dataSourceEntity) {
        HikariDataSource dataSource;
        Optional<JdbcTemplate> optional = Optional.empty();
        try {
            dataSource = dataSourceCache.get(new DataSourceKey(dataSourceEntity));
        } catch (ExecutionException e) {
            log.warn("Datasource err", e);
            return optional;
        }

        optional = Optional.ofNullable(new JdbcTemplate(dataSource));
        return optional;
    }

    private DataSourceConf buildDBConfig(DataSourceEntity dataSourceEntity) {
        ConnectionConfig connectionConfig = new ConnectionConfig();
        connectionConfig.setJdbcUrl(Lists.newArrayList(dataSourceEntity.getUrl()));

        DataSourceConf parameterConfig = new DataSourceConf();
        parameterConfig.setUsername(dataSourceEntity.getUsername());
        parameterConfig.setPassword(dataSourceEntity.getPassword());
        parameterConfig.setConnection(Lists.newArrayList(connectionConfig));
        return parameterConfig;
    }

    private DataSourceWriterConf buildDBWriterConfig(DataSourceEntity dataSourceEntity) {
        ConnectionWriterConfig connectionConfig = new ConnectionWriterConfig();
        connectionConfig.setJdbcUrl(dataSourceEntity.getUrl());

        DataSourceWriterConf parameterConfig = new DataSourceWriterConf();
        parameterConfig.setUsername(dataSourceEntity.getUsername());
        parameterConfig.setPassword(dataSourceEntity.getPassword());
        parameterConfig.setConnection(Lists.newArrayList(connectionConfig));
        return parameterConfig;
    }

    public static String toHiveType(String sqlTypeName, int sqlType) {
        switch (sqlType) {
            case Types.INTEGER:
            case Types.SMALLINT:
                return "INT";
            case Types.VARCHAR:
            case Types.CHAR:
            case Types.LONGVARCHAR:
            case Types.NVARCHAR:
            case Types.NCHAR:
            case Types.LONGNVARCHAR:
            case Types.CLOB:
                return "STRING";
            case Types.FLOAT:
            case Types.DOUBLE:
            case Types.REAL:
                return "DOUBLE";
            case Types.NUMERIC:
            case Types.DECIMAL:
                return "DOUBLE";
            case Types.BIT:
            case Types.BOOLEAN:
                return "BOOLEAN";
            case Types.TINYINT:
                return "INT";
            case Types.BIGINT:
                return "BIGINT";
            case Types.DATE:
                return "DATE";
            case Types.TIME:
            case Types.TIMESTAMP:
                return "TIMESTAMP";
            default:
                if ("datetime".equalsIgnoreCase(sqlTypeName)) {
                    return "TIMESTAMP";
                } else {
                    return "STRING";
                }
        }
    }
}
